{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "# Part I. ETL Pipeline for Pre-Processing the Files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Import Python packages "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "# Import Python packages \n",
    "import pandas as pd\n",
    "import cassandra\n",
    "import re\n",
    "import os\n",
    "import glob\n",
    "import numpy as np\n",
    "import json\n",
    "import csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Creating list of filepaths to process original event csv data files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Current working directory : /home/workspace\n"
     ]
    }
   ],
   "source": [
    "# checking current working directory\n",
    "print(f\"Current working directory : {os.getcwd()}\")\n",
    "\n",
    "# Get current folder and subfolder event data\n",
    "filepath = os.getcwd() + '/event_data'\n",
    "\n",
    "# Create a list of files and collect each filepath\n",
    "for root, dirs, files in os.walk(filepath):\n",
    "    \n",
    "# join the file path and roots with the subdirectories using glob\n",
    "    file_path_list = glob.glob(os.path.join(root,'*'))\n",
    "    #print(file_path_list)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Processing the files to create the data file csv that will be used for Apache Casssandra tables"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total rows : 8056\n",
      "Sample data:\n",
      " [['Stephen Lynch', 'Logged In', 'Jayden', 'M', '0', 'Bell', '182.85669', 'free', 'Dallas-Fort Worth-Arlington, TX', 'PUT', 'NextSong', '1.54099E+12', '829', \"Jim Henson's Dead\", '200', '1.54354E+12', '91'], ['Manowar', 'Logged In', 'Jacob', 'M', '0', 'Klein', '247.562', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', 'Shell Shock', '200', '1.54354E+12', '73'], ['Morcheeba', 'Logged In', 'Jacob', 'M', '1', 'Klein', '257.41016', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', 'Women Lose Weight (Feat: Slick Rick)', '200', '1.54354E+12', '73'], ['Maroon 5', 'Logged In', 'Jacob', 'M', '2', 'Klein', '231.23546', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', \"Won't Go Home Without You\", '200', '1.54354E+12', '73'], ['Train', 'Logged In', 'Jacob', 'M', '3', 'Klein', '216.76363', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', 'Hey_ Soul Sister', '200', '1.54354E+12', '73']]\n"
     ]
    }
   ],
   "source": [
    "# initiating an empty list of rows that will be generated from each file\n",
    "full_data_rows_list = [] \n",
    "    \n",
    "# for every filepath in the file path list \n",
    "for f in file_path_list:\n",
    "\n",
    "# reading csv file \n",
    "    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: \n",
    "        # creating a csv reader object \n",
    "        csvreader = csv.reader(csvfile) \n",
    "        next(csvreader)\n",
    "        \n",
    " # extracting each data row one by one and append it        \n",
    "        for line in csvreader:\n",
    "            full_data_rows_list.append(line) \n",
    "            \n",
    "\n",
    "print(f\"Total rows : {len(full_data_rows_list)}\")\n",
    "print(f\"Sample data:\\n {full_data_rows_list[:5]}\")\n",
    "\n",
    "# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \\\n",
    "# Apache Cassandra tables\n",
    "csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)\n",
    "\n",
    "with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:\n",
    "    writer = csv.writer(f, dialect='myDialect')\n",
    "    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\\\n",
    "                'level','location','sessionId','song','userId'])\n",
    "    for row in full_data_rows_list:\n",
    "        if (row[0] == ''):\n",
    "            continue\n",
    "        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "6821\n"
     ]
    }
   ],
   "source": [
    "# checking the number of rows in new event csv file\n",
    "with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:\n",
    "    print(sum(1 for line in f))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "## Now we are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: \n",
    "- artist \n",
    "- firstName of user\n",
    "- gender of user\n",
    "- item number in session\n",
    "- last name of user\n",
    "- length of the song\n",
    "- level (paid or free song)\n",
    "- location of the user\n",
    "- sessionId\n",
    "- song title\n",
    "- userId\n",
    "\n",
    "The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>\n",
    "\n",
    "<img src=\"images/image_event_datafile_new.jpg\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Creating a Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Connection Established !!\n"
     ]
    }
   ],
   "source": [
    "# This should make a connection to a Cassandra instance your local machine \n",
    "# (127.0.0.1)\n",
    "\n",
    "from cassandra.cluster import Cluster\n",
    "try:\n",
    "    cluster = Cluster(['127.0.0.1'])\n",
    "    session = cluster.connect()\n",
    "    print(\"Connection Established !!\")\n",
    "except Exception as e:\n",
    "    print(f\"Connection Failed !! Error : {e}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Creating Keyspace"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "keyspace_query = \"\"\"CREATE KEYSPACE IF NOT EXISTS sparkify \n",
    "                    with REPLICATION = \n",
    "                    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }\n",
    "                \"\"\"\n",
    "\n",
    "# Creating Keyspace\n",
    "try:\n",
    "    session.execute(keyspace_query)\n",
    "except Exception as e:\n",
    "    print(f\"Failed to create keyspace!! Error : {e}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Setting Keyspace"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "# Setting KEYSPACE to the keyspace specified above\n",
    "session.set_keyspace('sparkify')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "### Now we need to create tables to run the following queries. Remember, with Apache Cassandra we model the database tables on the queries we want to run."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "## Below are the queries following which we will build out data model\n",
    "\n",
    "### 1. Give the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4\n",
    "\n",
    "\n",
    "### 2. Give only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182\n",
    "    \n",
    "\n",
    "### 3. Give every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "# ========================================================================================================"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "## Query 1 \n",
    "\n",
    "### For query 1, we need a way to run query on sessionId and itemInSession. So, our primary key must have these columns. We can partition the data on sessionId.\n",
    "\n",
    "### Our Select query : SELECT artist, song, length FROM  session_item where sessionId = 338 and itemInSession = 4\n",
    "### Our Primary key will be (sessionId, itemInSession), where sessionId is the partition key and  itemInSession is the clustering column.\n",
    "### Columns we included in the table : "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Table Created!!\n"
     ]
    }
   ],
   "source": [
    "# Creating table for query1 \n",
    "create_query1 = \"\"\"CREATE TABLE IF NOT EXISTS session_item (artist text, song text, length float, sessionId int, itemInSession int, PRIMARY KEY (sessionId, itemInSession))\"\"\"\n",
    "\n",
    "try: \n",
    "    session.execute(create_query1)\n",
    "    print(\"Table Created!!\")\n",
    "except Exception as e:\n",
    "    print(f\"Table creation failed!! Error : {e}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "# Using the event file\n",
    "file = 'event_datafile_new.csv'\n",
    "\n",
    "# Reading csv file and inserting rows into cassandra tables.\n",
    "with open(file, encoding = 'utf8') as f:\n",
    "    csvreader = csv.reader(f)\n",
    "    next(csvreader) # skip header\n",
    "    for line in csvreader:\n",
    "        query = \"INSERT INTO session_item (artist, song, length, sessionId, itemInSession) \"\n",
    "        query = query + \" VALUES (%s, %s, %s, %s, %s) \"\n",
    "        session.execute(query, (line[0], line[10], float(line[5]), int(line[8]), int(line[3])) )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "#### Do a SELECT to verify that the data have been inserted into each table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {
    "editable": true,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Row(artist='Faithless', song='50', length=495.30731201171875)\n"
     ]
    }
   ],
   "source": [
    "# SELECT statement to verify the data was entered into the table\n",
    "select_query1 = \"SELECT artist, song, length FROM  session_item where sessionId = 338 and itemInSession = 4\"\n",
    "try:\n",
    "    rows = session.execute(select_query1)\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "    \n",
    "for row in rows:\n",
    "    print(row)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "## Query 2\n",
    "\n",
    "### For query 2, we need a way to run query on sessionId and userId. Also, we need the data sorted on itemInSession. So, our primary key must have these columns. We can partition the data on a composite key (sessionId, userId).\n",
    "\n",
    "### Our Select query : SELECT artist, song, firstName, lastName FROM  user_session where sessionId = 182 and userId = 10\n",
    "### Our Primary key will be ((sessionId, userId), itemInSession)), where (sessionId, userId) is the partition key and  itemInSession is the clustering column.\n",
    "### Also, we are using the clause - WITH CLUSTERING ORDER BY (itemInSession ASC), to sort our data based on itemInSession\n",
    "### Columns we included in the table : sessionId, userId, artist, song, firstName, lastName, itemInSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Table Created!!\n"
     ]
    }
   ],
   "source": [
    "# Creating table for query2 \n",
    "create_query2 = \"\"\"CREATE TABLE IF NOT EXISTS user_session (sessionId int, userId int, artist text, song text, firstName text, lastName text, itemInSession int, PRIMARY KEY ((sessionId, userId), itemInSession)) WITH CLUSTERING ORDER BY (itemInSession ASC) \"\"\"\n",
    "\n",
    "try: \n",
    "    session.execute(create_query2)\n",
    "    print(\"Table Created!!\")\n",
    "except Exception as e:\n",
    "    print(f\"Table creation failed!! Error : {e}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "file = 'event_datafile_new.csv'\n",
    "\n",
    "with open(file, encoding = 'utf8') as f:\n",
    "    csvreader = csv.reader(f)\n",
    "    next(csvreader) # skip header\n",
    "    for line in csvreader:\n",
    "        query = \"INSERT INTO user_session (sessionId, userId, artist, song, firstName, lastName, itemInSession) \"\n",
    "        query = query + \" VALUES (%s, %s, %s, %s, %s, %s, %s) \"\n",
    "        session.execute(query, (int(line[8]), int(line[10]), line[0], line[9], line[1], line[4], int(line[3])  ) )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Row(artist='Down To The Bone', song=\"Keep On Keepin' On\", firstname='Sylvie', lastname='Cruz')\n",
      "Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')\n",
      "Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')\n",
      "Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')\n"
     ]
    }
   ],
   "source": [
    "# SELECT statement to verify the data was entered into the table\n",
    "select_query2 = \"SELECT artist, song, firstName, lastName FROM  user_session where sessionId = 182 and userId = 10\"\n",
    "try:\n",
    "    rows = session.execute(select_query2)\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "\n",
    "for row in rows:\n",
    "    print(row)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "## Query 3\n",
    "\n",
    "### For query 3, we need a way to run query on song. So, our primary key must have song. Also, the query should be such that it does not contain duplicate users for a song. So we need to model data in such a way that we don't allow duplicate users for a song in our table. This can be acheived by including userId in our primary key.\n",
    "\n",
    "### Our Select query : SELECT song, firstName, lastName FROM user_song where song = 'All Hands Against His Own'\n",
    "### Our Primary key will be ((song), userId)), where song is the partition key and  userId is the clustering column.\n",
    "### Columns we included in the table : song, userId, firstName, lastName"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Table Created!!\n"
     ]
    }
   ],
   "source": [
    "# Creating table for query3\n",
    "\n",
    "create_query3 = \"\"\"CREATE TABLE IF NOT EXISTS user_song (song text, userId int, firstName text, lastName text, PRIMARY KEY ((song), userId))\"\"\"\n",
    "\n",
    "try: \n",
    "    session.execute(create_query3)\n",
    "    print(\"Table Created!!\")\n",
    "except Exception as e:\n",
    "    print(f\"Table creation failed!! Error : {e}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "file = 'event_datafile_new.csv'\n",
    "\n",
    "with open(file, encoding = 'utf8') as f:\n",
    "    csvreader = csv.reader(f)\n",
    "    next(csvreader) # skip header\n",
    "    for line in csvreader:\n",
    "        query = \"INSERT INTO user_song (song, userId, firstName, lastName) \"\n",
    "        query = query + \" VALUES (%s, %s, %s, %s) \"\n",
    "        session.execute(query, (  line[9], int(line[10]), line[1], line[4] )  )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Row(song='All Hands Against His Own', firstname='Jacqueline', lastname='Lynch')\n",
      "Row(song='All Hands Against His Own', firstname='Tegan', lastname='Levine')\n",
      "Row(song='All Hands Against His Own', firstname='Sara', lastname='Johnson')\n"
     ]
    }
   ],
   "source": [
    "# SELECT statement to verify the data was entered into the table\n",
    "select_query2 = \"SELECT song, firstName, lastName FROM user_song where song = 'All Hands Against His Own'\"\n",
    "try:\n",
    "    rows = session.execute(select_query2)\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "\n",
    "for row in rows:\n",
    "    print(row)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "### Drop the tables before closing out the sessions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {
    "editable": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<cassandra.cluster.ResultSet at 0x7f58042f1d68>"
      ]
     },
     "execution_count": 52,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "session.execute(\"DROP TABLE IF EXISTS sparkify.session_item\")\n",
    "session.execute(\"DROP TABLE IF EXISTS sparkify.user_session\")\n",
    "session.execute(\"DROP TABLE IF EXISTS sparkify.user_song\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true
   },
   "source": [
    "### Close the session and cluster connection¶"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": [
    "session.shutdown()\n",
    "cluster.shutdown()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "editable": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
